{
 "cells": [
  {
   "attachments": {},
   "cell_type": "markdown",
   "id": "6ec3e892",
   "metadata": {},
   "source": [
    "# 生成式预训练语言模型：理论与实战\n",
    "深蓝学院 课程 \n",
    "课程链接：https://www.shenlanxueyuan.com/course/620\n",
    "\n",
    "作者 **黄佳**"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 1,
   "id": "ae951534",
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "句子数量： 5\n",
      "中文词汇表大小： 19\n",
      "英文词汇表大小： 21\n",
      "中文词汇到索引的字典： {'改变': 0, '语言': 1, '小冰': 2, '处理': 3, '很': 4, '爱': 5, '我': 6, '复杂': 7, '深度': 8, '学习': 9, '世界': 10, '喜欢': 11, '<pad>': 12, '非常': 13, '自然': 14, '人工智能': 15, '咖哥': 16, '神经网络': 17, '强大': 18}\n",
      "英文词汇到索引的字典： {'XiaoBing': 0, 'is': 1, 'NLP': 2, 'Net': 3, 'Deep': 4, '<sos>': 5, 'learning': 6, 'so': 7, 'are': 8, 'YYDS': 9, 'love': 10, 'AI': 11, '<eos>': 12, 'complex': 13, 'likes': 14, 'KaGe': 15, 'I': 16, 'studying': 17, 'Neural': 18, '<pad>': 19, 'powerful': 20}\n"
     ]
    }
   ],
   "source": [
    "# 构建训练句子集，每一个句子包含中文，英文（解码器输入）和翻译成英文后目标输出三个部分\n",
    "sentences = [\n",
    "    ['咖哥 喜欢 小冰 <pad> <pad>', '<sos> KaGe likes XiaoBing <pad>', 'KaGe likes XiaoBing <pad> <eos>'],\n",
    "    ['我 爱 学习 人工智能 <pad>', '<sos> I love studying AI', 'I love studying AI <eos>'],\n",
    "    ['深度 学习 改变 世界 <pad>', '<sos> Deep learning YYDS <pad>', 'Deep learning YYDS <pad> <eos>'],\n",
    "    ['自然 语言 处理 很 强大', '<sos> NLP is so powerful', 'NLP is so powerful <eos>'],\n",
    "    ['神经网络 非常 复杂 <pad> <pad>', '<sos> Neural Net are complex', 'Neural Net are complex <eos>']]\n",
    "\n",
    "word_list_cn, word_list_en = [], []  # 初始化中英文单词列表\n",
    "# 遍历每一个句子并将单词添加到单词列表中\n",
    "for s in sentences:\n",
    "    word_list_cn.extend(s[0].split())\n",
    "    word_list_en.extend(s[1].split())\n",
    "    word_list_en.extend(s[2].split())\n",
    "\n",
    "# 去重得到不重复的单词列表\n",
    "word_list_cn = list(set(word_list_cn))\n",
    "word_list_en = list(set(word_list_en))\n",
    "\n",
    "# 构建单词到索引的映射\n",
    "word2idx_cn = {w: i for i, w in enumerate(word_list_cn)}\n",
    "word2idx_en = {w: i for i, w in enumerate(word_list_en)}\n",
    "\n",
    "# 构建索引到单词的映射\n",
    "idx2word_cn = {i: w for i, w in enumerate(word_list_cn)}\n",
    "idx2word_en = {i: w for i, w in enumerate(word_list_en)}\n",
    "\n",
    "# 计算词汇表的大小\n",
    "voc_size_cn = len(word_list_cn)\n",
    "voc_size_en = len(word_list_en)\n",
    "\n",
    "print(\"句子数量：\", len(sentences)) # 打印句子数\n",
    "print(\"中文词汇表大小：\", voc_size_cn) #打印中文词汇表大小\n",
    "print(\"英文词汇表大小：\", voc_size_en) #打印英文词汇表大小\n",
    "print(\"中文词汇到索引的字典：\", word2idx_cn) # 中文词汇到索引\n",
    "print(\"英文词汇到索引的字典：\", word2idx_en) # 英文词汇到索引"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 10,
   "id": "84507925",
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "原始句子: ['自然 语言 处理 很 强大', '<sos> NLP is so powerful', 'NLP is so powerful <eos>']\n",
      "编码器输入张量的形状: torch.Size([1, 5])\n",
      "解码器输入张量的形状: torch.Size([1, 5])\n",
      "目标张量的形状: torch.Size([1, 5])\n",
      "编码器输入张量: tensor([[14,  1,  3,  4, 18]])\n",
      "解码器输入张量: tensor([[ 5,  2,  1,  7, 20]])\n",
      "目标张量: tensor([[ 2,  1,  7, 20, 12]])\n"
     ]
    }
   ],
   "source": [
    "import numpy as np # 导入numpy\n",
    "import torch # 导入torch\n",
    "import random # 导入random库\n",
    "# 定义一个函数，随机选择一个句子和词典生成输入、输出和目标数据\n",
    "def make_data(sentences):\n",
    "    # 随机选择一个句子进行训练\n",
    "    random_sentence = random.choice(sentences)    \n",
    "    # 将输入句子中的单词转换为对应的索引\n",
    "    encoder_input = np.array([[word2idx_cn[n] for n in random_sentence[0].split()]])\n",
    "    # 将输出句子中的单词转换为对应的索引\n",
    "    decoder_input = np.array([[word2idx_en[n] for n in random_sentence[1].split()]])\n",
    "    # 将目标句子中的单词转换为对应的索引\n",
    "    target = np.array([[word2idx_en[n] for n in random_sentence[2].split()]])\n",
    "    # 将输入、输出和目标批次转换为LongTensor\n",
    "    encoder_input = torch.LongTensor(encoder_input)\n",
    "    decoder_input = torch.LongTensor(decoder_input)\n",
    "    target = torch.LongTensor(target)\n",
    "    return encoder_input, decoder_input, target\n",
    "\n",
    "# 使用make_data函数生成输入、输出和目标张量\n",
    "encoder_input, decoder_input, target = make_data(sentences)\n",
    "for s in sentences: # 获取原始句子\n",
    "    if all([word2idx_cn[w] in encoder_input[0] for w in s[0].split()]):\n",
    "        original_sentence = s\n",
    "        break\n",
    "print(\"原始句子:\", original_sentence) # 打印原始句子\n",
    "print(\"编码器输入张量的形状:\", encoder_input.shape)  # 打印输入张量形状\n",
    "print(\"解码器输入张量的形状:\", decoder_input.shape) # 打印输出张量形状\n",
    "print(\"目标张量的形状:\", target.shape) # 打印目标张量形状\n",
    "print(\"编码器输入张量:\", encoder_input) # 打印输入张量\n",
    "print(\"解码器输入张量:\", decoder_input) # 打印输出张量\n",
    "print(\"目标张量:\", target) # 打印目标张量"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 9,
   "id": "ee814517",
   "metadata": {},
   "outputs": [],
   "source": [
    "def make_batch(sentences, batch_size=2):\n",
    "    # 随机选择batch_size个句子进行训练\n",
    "    random_sentences = random.sample(sentences, batch_size)\n",
    "    \n",
    "    # 初始化encoder_input, decoder_input, target的列表\n",
    "    encoder_input_list, decoder_input_list, target_list = [], [], []\n",
    "\n",
    "    # 遍历随机选择的句子\n",
    "    for random_sentence in random_sentences:\n",
    "        # 将输入句子中的单词转换为对应的索引\n",
    "        encoder_input_list.append([word2idx_cn[n] for n in random_sentence[0].split()])\n",
    "        # 将输出句子中的单词转换为对应的索引\n",
    "        decoder_input_list.append([word2idx_en[n] for n in random_sentence[1].split()])\n",
    "        # 将目标句子中的单词转换为对应的索引\n",
    "        target_list.append([word2idx_en[n] for n in random_sentence[2].split()])\n",
    "\n",
    "    # 将列表转换为numpy数组\n",
    "    encoder_input = np.array(encoder_input_list)\n",
    "    decoder_input = np.array(decoder_input_list)\n",
    "    target = np.array(target_list)\n",
    "\n",
    "    # 将输入、输出和目标批次转换为LongTensor\n",
    "    encoder_input = torch.LongTensor(encoder_input)\n",
    "    decoder_input = torch.LongTensor(decoder_input)\n",
    "    target = torch.LongTensor(target)\n",
    "    return encoder_input, decoder_input, target"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 4,
   "id": "1b01a076",
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "编码器结构： Encoder(\n",
      "  (embedding): Embedding(19, 128)\n",
      "  (rnn): RNN(128, 128, batch_first=True)\n",
      ")\n",
      "解码器结构： Decoder(\n",
      "  (embedding): Embedding(21, 128)\n",
      "  (rnn): RNN(128, 128, batch_first=True)\n",
      "  (out): Linear(in_features=128, out_features=21, bias=True)\n",
      ")\n"
     ]
    }
   ],
   "source": [
    "import torch.nn as nn # 导入torch.nn库\n",
    "# 定义编码器类，继承自nn.Module\n",
    "class Encoder(nn.Module):\n",
    "    def __init__(self, input_size, hidden_size):\n",
    "        super(Encoder, self).__init__()       \n",
    "        self.hidden_size = hidden_size # 设置隐藏层大小       \n",
    "        self.embedding = nn.Embedding(input_size, hidden_size) # 创建词嵌入层       \n",
    "        self.rnn = nn.RNN(hidden_size, hidden_size, batch_first=True) # 创建RNN层\n",
    "    # 定义前向传播函数\n",
    "    def forward(self, inputs, hidden):\n",
    "        embedded = self.embedding(inputs) # 将输入转换为嵌入向量       \n",
    "        output, hidden = self.rnn(embedded, hidden) # 将嵌入向量输入RNN层并获取输出\n",
    "        return output, hidden\n",
    "\n",
    "# 定义解码器类，继承自nn.Module\n",
    "class Decoder(nn.Module):\n",
    "    def __init__(self, hidden_size, output_size):\n",
    "        super(Decoder, self).__init__()       \n",
    "        self.hidden_size = hidden_size # 设置隐藏层大小       \n",
    "        self.embedding = nn.Embedding(output_size, hidden_size) # 创建词嵌入层\n",
    "        self.rnn = nn.RNN(hidden_size, hidden_size, batch_first=True)  # 创建RNN层       \n",
    "        self.out = nn.Linear(hidden_size, output_size) # 创建线性输出层\n",
    "    # 定义前向传播函数\n",
    "    def forward(self, inputs, hidden):       \n",
    "        embedded = self.embedding(inputs) # 将输入转换为嵌入向量       \n",
    "        output, hidden = self.rnn(embedded, hidden) # 将嵌入向量输入RNN层并获取输出       \n",
    "        output = self.out(output) # 使用线性层生成最终输出\n",
    "        return output, hidden\n",
    "\n",
    "device = 'cuda' if torch.cuda.is_available() else 'cpu' # 设置设备，检查是否有GPU\n",
    "n_hidden = 128 # 设置隐藏层数量\n",
    "\n",
    "# 创建编码器和解码器\n",
    "encoder = Encoder(voc_size_cn, n_hidden)\n",
    "decoder = Decoder(n_hidden, voc_size_en)\n",
    "print('编码器结构：', encoder)  # 打印编码器的结构\n",
    "print('解码器结构：', decoder)  # 打印解码器的结构"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 5,
   "id": "9b195375",
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "S2S模型结构： Seq2Seq(\n",
      "  (encoder): Encoder(\n",
      "    (embedding): Embedding(19, 128)\n",
      "    (rnn): RNN(128, 128, batch_first=True)\n",
      "  )\n",
      "  (decoder): Decoder(\n",
      "    (embedding): Embedding(21, 128)\n",
      "    (rnn): RNN(128, 128, batch_first=True)\n",
      "    (out): Linear(in_features=128, out_features=21, bias=True)\n",
      "  )\n",
      ")\n"
     ]
    }
   ],
   "source": [
    "class Seq2Seq(nn.Module):\n",
    "    def __init__(self, encoder, decoder):\n",
    "        super(Seq2Seq, self).__init__()\n",
    "        # 初始化编码器和解码器\n",
    "        self.encoder = encoder\n",
    "        self.decoder = decoder\n",
    "    # 定义前向传播函数\n",
    "    def forward(self, encoder_input, hidden, decoder_input):\n",
    "        # 将输入序列通过编码器并获取输出和隐藏状态\n",
    "        encoder_output, encoder_hidden = self.encoder(encoder_input, hidden)\n",
    "        # 将编码器的隐藏状态传递给解码器作为初始隐藏状态\n",
    "        decoder_hidden = encoder_hidden\n",
    "        # 将目标序列通过解码器并获取输出\n",
    "        decoder_output, _ = self.decoder(decoder_input, decoder_hidden)\n",
    "        return decoder_output\n",
    "\n",
    "# 创建Seq2Seq模型\n",
    "model = Seq2Seq(encoder, decoder)\n",
    "print('S2S模型结构：', model)  # 打印模型的结构"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 6,
   "id": "876e5ff9",
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "Epoch: 0100 cost = 0.032659\n",
      "Epoch: 0200 cost = 0.011341\n",
      "Epoch: 0300 cost = 0.006474\n",
      "Epoch: 0400 cost = 0.004237\n",
      "Epoch: 0500 cost = 0.002764\n",
      "Epoch: 0600 cost = 0.002077\n",
      "Epoch: 0700 cost = 0.001579\n",
      "Epoch: 0800 cost = 0.001268\n",
      "Epoch: 0900 cost = 0.001135\n",
      "Epoch: 1000 cost = 0.001027\n"
     ]
    }
   ],
   "source": [
    "# 定义训练函数\n",
    "def train_seq2seq(model, criterion, optimizer, epochs):\n",
    "    for epoch in range(epochs):\n",
    "        encoder_input, decoder_input, target = make_batch(sentences) # 训练数据的创建\n",
    "        hidden = torch.zeros(1, encoder_input.size(0), n_hidden) # 初始化隐藏状态      \n",
    "        optimizer.zero_grad()# 梯度清零        \n",
    "        output = model(encoder_input, hidden, decoder_input) # 获取模型输出        \n",
    "        loss = criterion(output.view(-1, voc_size_en), target.view(-1)) # 计算损失        \n",
    "        if (epoch + 1) % 100 == 0: # 打印损失\n",
    "            print(f\"Epoch: {epoch + 1:04d} cost = {loss:.6f}\")         \n",
    "        loss.backward()# 反向传播        \n",
    "        optimizer.step()# 更新参数\n",
    "\n",
    "# 训练模型\n",
    "epochs = 1000 # 训练轮次\n",
    "criterion = nn.CrossEntropyLoss(ignore_index=word2idx_en['<pad>']) # 损失函数\n",
    "optimizer = torch.optim.Adam(model.parameters(), lr=0.001) # 优化器\n",
    "train_seq2seq(model, criterion, optimizer, epochs) # 调用函数训练模型"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 7,
   "id": "1d5c446d",
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "咖哥 喜欢 小冰 -> ['Neural', 'Net', 'so']\n",
      "自然 语言 处理 很 强大 -> ['NLP', 'is', 'so', 'so', '<eos>']\n"
     ]
    }
   ],
   "source": [
    "# 定义测试函数\n",
    "def test_seq2seq(model, source_sentence, word_dict, number_dict):\n",
    "    # 将输入句子转换为索引\n",
    "    encoder_input = np.array([[word2idx_cn[n] for n in source_sentence.split()]])\n",
    "    # 构建输出句子的索引，以'<sos>'开始，后面跟'<eos>'，长度与输入句子相同\n",
    "    decoder_input = np.array([word2idx_en['<sos>']] + [word_dict['<eos>']]*(len(encoder_input[0])-1))\n",
    "    # 转换为LongTensor\n",
    "    encoder_input = torch.LongTensor(encoder_input)\n",
    "    decoder_input = torch.LongTensor(decoder_input).unsqueeze(0) # 增加一维    \n",
    "    hidden = torch.zeros(1, encoder_input.size(0), n_hidden) # 初始化隐藏状态    \n",
    "    predict = model(encoder_input, hidden, decoder_input) # 获取模型输出    \n",
    "    predict = predict.data.max(2, keepdim=True)[1] # 获取最大概率的索引\n",
    "    # 打印输入句子和预测的句子\n",
    "    print(source_sentence, '->', [number_dict[n.item()] for n in predict.squeeze()])\n",
    "\n",
    "# 测试模型\n",
    "test_seq2seq(model, '咖哥 喜欢 小冰', word2idx_en, idx2word_en)  \n",
    "test_seq2seq(model, '自然 语言 处理 很 强大', word2idx_en, idx2word_en)  "
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "eea1716c",
   "metadata": {},
   "outputs": [],
   "source": []
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "Python 3 (ipykernel)",
   "language": "python",
   "name": "python3"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 3
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython3",
   "version": "3.10.9"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 5
}
